package mqtt

import (
	"errors"
	"gitee.com/squbirreland/imgo"
	"gitee.com/squbirreland/imgo/mqtt/agreement"
	"gitee.com/squbirreland/imgo/mqtt/agreement/constantHeader"
	"gitee.com/squbirreland/imgo/mqtt/agreement/payload"
	"gitee.com/squbirreland/imgo/mqtt/agreement/variableHeader"
	"gitee.com/squbirreland/imgo/mqtt/constant"
	"log"
	"net"
	"reflect"
)

type mqttManagement struct {
}

func NewMqttManagement() *mqttManagement {
	return &mqttManagement{}
}

func (m *mqttManagement) NewConnInterceptor(conn *net.Conn) *net.Conn {
	logger.Info().Println((*conn).RemoteAddr(), " in ")
	return conn
}

func (m *mqttManagement) NewWorkerInterceptor(worker *imgo.WorkRoutine) *imgo.WorkRoutine {
	return worker
}

func (m *mqttManagement) ConnCloser(conn *net.Conn) {
	defer func() {
		err := recover()
		if err != nil {
			log.Println(err)
		}
	}()

	logger.Info().Println("|-x-<", (*conn).RemoteAddr().String(), " out ")
	err := (*conn).Close()
	if err != nil {
		log.Println(err.Error())
	}
	conn = nil
}

func (m *mqttManagement) Parse(From *net.Conn, OriginMsg *[]byte, flag int) (To []*net.Conn, Msg *[]byte) {
	To = make([]*net.Conn, 1)
	To[0] = From
	Msg = OriginMsg
	if flag == 1 {
		return To, Msg
	}
	//解析
	packet, err := parseMqtt(OriginMsg)
	if err != nil {
		log.Println(err.Error())
		return nil, nil
	}
	logger.Debug().Printf(" -- parsed : CH : %v\n", packet.ConstantHeader)
	logger.Debug().Printf(" -- parsed : VH : %v\n", packet.VariableHeader)
	logger.Debug().Printf(" -- parsed : PL : %v\n", packet.Payload)
	//逻辑处理
	m.service(packet)
	//相应
	result, err := createResult(packet)
	if err != nil {
		log.Println(err.Error())
		return nil, nil
	}
	Msg = &result
	return
}

func parseMqtt(OriginMsg *[]byte) (packet *agreement.Packet, err error) {
	packet = new(agreement.Packet)
	//通过固定头的解析 获得该报文对象 与固定头长度 剩余长度
	cHeader := constantHeader.ConstantHeader{}
	cHeader.SetHeaderBytes(*OriginMsg)
	err = cHeader.Parse()
	if err != nil {
		return nil, err
	}
	packet.ConstantHeader = cHeader
	//创建可变头部
	var vHeader variableHeader.IVariableHeader = nil
	//判断头部
	if cHeader.Control == constant.CONNECT {
		vHeader = &variableHeader.Connect{}
	} else if cHeader.Control == constant.PUBLISH {
		vHeader = &variableHeader.Publish{}
	} else if cHeader.Control == constant.PINGREQ {
		logger.Info().Println(" -- heartbeat request")
	} else if cHeader.Control == constant.DISCONNECT {
		return nil, errors.New(" client disconnect normally ")
	} else {
		return nil, errors.New(" sorry have not variable head parser for this . ")
	}
	variableHeaderLength := 0
	//如果有可变头 解析
	if vHeader != nil {
		vHeader.SetHeaderBytes((*OriginMsg)[cHeader.GetLength():])
		err := vHeader.Parse(cHeader)
		if err != nil {
			return nil, err
		}
		packet.VariableHeader = vHeader
		variableHeaderLength = vHeader.GetLength()
	}
	//payload
	pb := (*OriginMsg)[cHeader.GetLength()+variableHeaderLength:]
	packet.Payload = &payload.DefaultPayload{}
	packet.Payload.SetPayloadBytes(pb)
	return packet, nil
}

func createResult(fromPacket *agreement.Packet) ([]byte, error) {
	fromCHeader := fromPacket.ConstantHeader
	//相应构造
	var packet *agreement.Packet
	if fromCHeader.Control == constant.CONNECT {
		z := constant.ZoneBit4{RETAIN: false, QOS: constant.OnlySendOnce, DUP: false}
		c := constantHeader.NewConstantHeader(constant.CONNACK, z, 2)
		packet = agreement.NewPacket(*c, nil)
		//反射获取请求链接的属性 当前vHeader为interface 所以使用value.Elem()
		connFlag := reflect.ValueOf(fromPacket.VariableHeader).Elem().FieldByName("ConnFlag")
		cleanSession := connFlag.FieldByName("CleanSession").Bool()
		var sp variableHeader.SessionPresent
		if cleanSession {
			sp = variableHeader.SessionUnSave
		} else {
			sp = variableHeader.SessionSaved
		}
		//构造返回请求的报文对象
		packet.VariableHeader = &variableHeader.ConnAck{
			ConnectAcknowledgeFlags: sp,
			ConnectReturnCode:       variableHeader.ConnAccept,
		}
	} else if fromCHeader.Control == constant.PINGREQ {
		z := constant.ZoneBit4{RETAIN: true, QOS: constant.OnceLessArrive, DUP: true}
		c := constantHeader.NewConstantHeader(constant.PINGRESP, z, 0)
		packet = agreement.NewPacket(*c, nil)
	} else if fromCHeader.Control == constant.PUBLISH {
		z := constant.ZoneBit4{RETAIN: true, QOS: constant.OnceLessArrive, DUP: true}
		c := constantHeader.NewConstantHeader(constant.PUBACK, z, 2)
		packet = agreement.NewPacket(*c, nil)
		//
		identifierV := reflect.ValueOf(fromPacket.VariableHeader).Elem().FieldByName("Identifier").Int()
		packet.VariableHeader = &variableHeader.PubAck{
			Identifier: variableHeader.PacketIdentifier(identifierV),
		}
	} else {
		return nil, errors.New(" sorry have not result generator for this . ")
	}
	//创建固定头
	err := packet.ConstantHeader.GenerateBytes()
	if err != nil {
		return nil, err
	}
	header := packet.ConstantHeader.GetHeaderBytes()
	//创建可变头
	if packet.VariableHeader != nil {
		err := packet.VariableHeader.CreateBytes(fromCHeader)
		if err != nil {
			return nil, err
		}
		header = append(header, packet.VariableHeader.GetHeaderBytes()...)
	}
	return header, nil
}

func (m *mqttManagement) service(packet *agreement.Packet) {
	control := packet.ConstantHeader.Control
	if control == constant.PUBLISH {
		Publish(m, packet)
	}
}
